In [1]:
%matplotlib inline
import torch
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
torch.manual_seed(1)
import numpy as np
from tqdm import tqdm
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
sns.set_context("poster")
sns.set_style("ticks")
In [2]:
a = np.random.rand(10)
a
Out[2]:
In [3]:
a_tch = torch.Tensor(a)
a_tch
Out[3]:
In [4]:
a_tch[0]
Out[4]:
In [5]:
a_tch = torch.rand((2,3,4))
a_tch
Out[5]:
In [6]:
x = torch.Tensor([[0, 1.], [2, 3]])
y = torch.Tensor([[1, 4.], [5, 3]])
x+y
Out[6]:
In [7]:
torch.cat([x,y], 1)
Out[7]:
In [8]:
x.view((1,-1))
Out[8]:
In [9]:
x = Variable(torch.Tensor([1,2]), requires_grad=True)
print(x.data)
In [10]:
x
Out[10]:
In [11]:
y = Variable(torch.Tensor([3,5]), requires_grad=True)
z = x+y
z
Out[11]:
In [12]:
z.creator
Out[12]:
In [13]:
s = z.sum()
s.creator
Out[13]:
In [14]:
s.backward()
print(x.grad)
In [15]:
## Tensors are data objects
x = torch.rand((2,2))
y = torch.rand((2,2))
z = x+y
## Variables are variable objects also contain the grad details and computational graph details
var_x = Variable(x)
var_y = Variable(y)
var_z = var_x + var_y
print(var_z.creator)
## I am breaking the chain of the variable by creating a new variable with same data.
## Gradients will not propogate
var_z_data = var_z.data
new_var_z = Variable(var_z_data)
print(new_var_z.creator)
In [16]:
x.size()
Out[16]:
In [17]:
var_x.data.size()
Out[17]:
In [18]:
torch.randn(1)
Out[18]:
In [19]:
torch.randn([1,2]).expand_as(torch.randn((2,2))) + torch.randn((2,2))
Out[19]:
In [20]:
var_x.transpose(1,0)
Out[20]:
In [21]:
l1 = nn.Linear(x.size(1), 4)
h1 = l1(var_x)
F.relu(h1)
Out[21]:
In [22]:
F.softmax(h1)
Out[22]:
In [23]:
F.softmax(h1).sum(1)
Out[23]:
In [24]:
data = [ ("me gusta comer en la cafeteria".split(), "SPANISH"),
("Give it to me".split(), "ENGLISH"),
("No creo que sea una buena idea".split(), "SPANISH"),
("No it is not a good idea to get lost at sea".split(), "ENGLISH") ]
test_data = [ ("Yo creo que si".split(), "SPANISH"),
("it is lost on me".split(), "ENGLISH")]
In [25]:
class Vocab(object):
def __init__(self, name="vocab",
offset_items=tuple([]),
UNK=None):
self.name = name
self.item2idx = {}
self.idx2item = []
self.size = 0
self.UNK = UNK
self.batch_add(offset_items)
if UNK is not None:
self.add(UNK)
self.UNK_ID = self.item2idx[self.UNK]
self.offset = self.size
def add(self, item):
if item not in self.item2idx:
self.item2idx[item] = self.size
self.size += 1
self.idx2item.append(item)
def batch_add(self, items):
for item in items:
self.add(item)
def getidx(self, item):
if item not in self.item2idx:
if self.UNK is None:
raise RuntimeError("UNK is not defined. %s not in vocab." % item)
return self.UNK_ID
return self.item2idx[item]
def __repr__(self):
return "Vocab(name={}, size={:d}, UNK={}, offset={:d})".format(
self.name, self.size,
self.UNK, self.offset
)
In [26]:
word_vocab = Vocab("words", UNK="UNK")
label_vocab = Vocab("labels")
for sent, label in data:
label_vocab.add(label)
for word in sent:
word_vocab.add(word)
print(word_vocab, word_vocab.idx2item, word_vocab.item2idx)
print(label_vocab, label_vocab.idx2item, label_vocab.item2idx)
In [27]:
class BoWModule(nn.Module):
def __init__(self, input_size, output_size):
super(BoWModule, self).__init__()
self.W = nn.Linear(input_size, output_size)
def forward(self, X):
return F.log_softmax(self.W(X))
In [28]:
class Seq2Vec(object):
def __init__(self, vocab):
self.vocab = vocab
def encode(self, seq):
vec = []
for item in seq:
vec.append(self.vocab.getidx(item))
return vec
def batch_encode(self, seq_batch):
vecs = [self.encode(seq) for seq in seq_batch]
return vecs
class Seq2OneHot(object):
def __init__(self, size):
self.size = size
def encode(self, x, as_variable=False):
one_hot = torch.zeros(self.size)
for i in x:
one_hot[i] += 1
one_hot = one_hot.view(1, -1)
if as_variable:
return Variable(one_hot)
return one_hot
In [29]:
sent_encoder = Seq2Vec(word_vocab)
def data2vec(data):
X, y = [], []
for sent, label in data:
y.append([label_vocab.getidx(label)])
X.append(sent_encoder.encode(sent))
return X, y
X_train, y_train = data2vec(data)
X_test, y_test = data2vec(test_data)
print(X_train, y_train, X_test, y_test)
sent_onehot_encoder = Seq2OneHot(word_vocab.size)
print(sent_onehot_encoder.encode(X_train[0]))
In [30]:
model = BoWModule(word_vocab.size, label_vocab.size)
log_probs = model.forward(sent_onehot_encoder.encode(X_train[0], as_variable=True))
print("Log probs: ", log_probs)
In [31]:
def print_log_probs(log_probs, label_vocab, label_true=None):
for i, label_probs in enumerate(log_probs.data.tolist()):
prob_string = ", ".join([
"{}: {:.3f}".format(label_vocab.idx2item[j], val)
for j, val in enumerate(label_probs)
])
true_string = "?"
if label_true is not None:
true_string = label_vocab.idx2item[label_true[i]]
print(prob_string, "True label: ", true_string)
In [32]:
print_log_probs(log_probs, label_vocab)
In [33]:
for seq, label in zip(X_test, y_test):
log_probs = model.forward(sent_onehot_encoder.encode(seq, as_variable=True))
print_log_probs(log_probs, label_vocab, label_true=label)
In [34]:
next(model.parameters())[:, word_vocab.getidx("creo")]
Out[34]:
In [35]:
loss_function = nn.NLLLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)
def get_loss(loss_function, seq, label, verbose=False):
## Clear gradients before every update else memory runs out
model.zero_grad()
## Do forward pass
log_probs = model.forward(sent_onehot_encoder.encode(seq, as_variable=True))
if verbose:
print(log_probs)
## Get labels
target = Variable(torch.LongTensor(label))
if verbose:
print(target)
## Get loss
loss = loss_function(log_probs, target)
if verbose:
print(loss)
return loss
In [36]:
loss = get_loss(loss_function, X_train[0], y_train[0], verbose=True)
In [37]:
loss.data[0]
Out[37]:
In [38]:
losses = []
for epoch in tqdm(range(100)):
for seq, label in zip(X_train, y_train):
loss = get_loss(loss_function, seq, label)
losses.append(loss.data[0])
## Get gradients of model params wrt. loss
loss.backward()
## Optimize the loss by one step
optimizer.step()
In [39]:
plt.plot(losses)
plt.xlabel("Step")
plt.ylabel("Loss")
Out[39]:
In [40]:
for seq, label in zip(X_test, y_test):
log_probs = model.forward(sent_onehot_encoder.encode(seq, as_variable=True))
print_log_probs(log_probs, label_vocab, label_true=label)
In [41]:
word_embeddings = nn.Embedding(word_vocab.size, 5)
word_embeddings
Out[41]:
In [42]:
word_embeddings(Variable(torch.LongTensor([X_test[0]])))
Out[42]:
In [43]:
X_test[0]
Out[43]:
In [44]:
word_embeddings(Variable(torch.LongTensor([X_test[0]]))).mean(1).view(-1, 5)
Out[44]:
In [45]:
word_embeddings.embedding_dim
Out[45]:
In [46]:
class BoEmbeddingsModule(nn.Module):
def __init__(self, vocab_size, embedding_size, output_size):
super(BoEmbeddingsModule, self).__init__()
self.word_embeddings = nn.Embedding(vocab_size, embedding_size)
self.W = nn.Linear(embedding_size, output_size)
def forward(self, X):
hidden_layer = self.word_embeddings(X).mean(1).view(-1,self.word_embeddings.embedding_dim)
return F.log_softmax(self.W(hidden_layer))
In [47]:
model = BoEmbeddingsModule(word_vocab.size, 5, label_vocab.size)
log_probs = model.forward(Variable(torch.LongTensor([X_test[0]])))
print("Log probs: ", log_probs)
In [48]:
loss_function = nn.NLLLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)
def get_loss_embedding(loss_function, seq, label, verbose=False):
## Clear gradients before every update else memory runs out
model.zero_grad()
## Do forward pass
log_probs = model.forward(Variable(torch.LongTensor([seq])))
if verbose:
print(log_probs)
## Get labels
target = Variable(torch.LongTensor(label))
if verbose:
print(target)
## Get loss
loss = loss_function(log_probs, target)
if verbose:
print(loss)
return loss
In [49]:
loss = get_loss_embedding(loss_function, X_train[0], y_train[0], verbose=True)
In [50]:
losses = []
for epoch in tqdm(range(100)):
for seq, label in zip(X_train, y_train):
loss = get_loss_embedding(loss_function, seq, label)
losses.append(loss.data[0])
## Get gradients of model params wrt. loss
loss.backward()
## Optimize the loss by one step
optimizer.step()
plt.plot(losses)
plt.xlabel("Step")
plt.ylabel("Loss")
Out[50]:
In [51]:
word_embeddings(Variable(torch.LongTensor([X_test[0]])))
Out[51]:
In [52]:
word_embeddings(Variable(torch.LongTensor([X_test[0]]))).permute(1, 0, 2)
Out[52]:
In [53]:
lstm = nn.LSTM(5,2)
input_units = word_embeddings(Variable(torch.LongTensor([X_test[0]]))).permute(1, 0, 2)
hidden_units = (Variable(torch.randn(1,1,2)), Variable(torch.randn((1,1,2))))
In [54]:
lstm
Out[54]:
In [55]:
input_units
Out[55]:
In [56]:
hidden_units
Out[56]:
In [57]:
input_units[:1, :, :]
Out[57]:
In [58]:
Variable(torch.randn((1,5))).view(1,1,-1)
Out[58]:
In [59]:
lstm(input_units[:1, :, :], hidden_units)
Out[59]:
In [60]:
lstm(input_units, hidden_units)
Out[60]:
In [61]:
out, hidden = lstm(input_units, hidden_units)
out, hidden
Out[61]:
In [62]:
out[-1, :, :]
Out[62]:
In [63]:
F.log_softmax(out[-1, :, :])
Out[63]:
In [64]:
class LSTMPredictor(nn.Module):
def __init__(self, vocab_size, embedding_size, hidden_size, output_size):
super(LSTMPredictor, self).__init__()
self.word_embeddings = nn.Embedding(vocab_size, embedding_size)
self.lstm = nn.LSTM(embedding_size, hidden_size)
self.output = nn.Linear(hidden_size, output_size)
def forward(self, X):
seq_embed = self.word_embeddings(X).permute(1, 0, 2)
out, hidden = self.lstm(seq_embed)
output = self.output(out[-1, :, :])
return F.log_softmax(output)
In [65]:
model = LSTMPredictor(word_vocab.size, 5, 3, label_vocab.size)
log_probs = model.forward(Variable(torch.LongTensor([X_test[0]])))
print("Log probs: ", log_probs)
In [66]:
loss_function = nn.NLLLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)
def get_loss_embedding_lstm(model, loss_function, seq, label, verbose=False):
## Clear gradients before every update else memory runs out
model.zero_grad()
## Do forward pass
log_probs = model.forward(Variable(torch.LongTensor([seq])))
if verbose:
print(log_probs)
## Get labels
target = Variable(torch.LongTensor(label))
if verbose:
print(target)
## Get loss
loss = loss_function(log_probs, target)
if verbose:
print(loss)
return loss
In [67]:
loss = get_loss_embedding_lstm(model, loss_function, X_train[0], y_train[0], verbose=True)
In [68]:
losses = []
for epoch in tqdm(range(100)):
for seq, label in zip(X_train, y_train):
loss = get_loss_embedding_lstm(model, loss_function, seq, label)
losses.append(loss.data[0])
## Get gradients of model params wrt. loss
loss.backward()
## Optimize the loss by one step
optimizer.step()
plt.plot(losses)
plt.xlabel("Step")
plt.ylabel("Loss")
Out[68]:
In [69]:
training_data = [
("The dog ate the apple".split(), ["DET", "NN", "V", "DET", "NN"]),
("Everybody read that book".split(), ["NN", "V", "DET", "NN"])
]
In [70]:
word_vocab = Vocab("words", UNK="UNK")
pos_vocab = Vocab("pos_tags")
for sent, pos_tags in training_data:
for word, pos in zip(sent, pos_tags):
word_vocab.add(word)
pos_vocab.add(pos)
print(word_vocab, word_vocab.idx2item, word_vocab.item2idx)
print(pos_vocab, pos_vocab.idx2item, pos_vocab.item2idx)
In [71]:
sent_encoder = Seq2Vec(word_vocab)
pos_encoder = Seq2Vec(pos_vocab)
def dataseq2vec(data):
X, Y = [], []
for sent, tags in data:
X.append(sent_encoder.encode(sent))
Y.append(pos_encoder.encode(tags))
return X, Y
X_train, Y_train = dataseq2vec(training_data)
print(X_train, Y_train)
In [72]:
class LSTMTagger(nn.Module):
def __init__(self, vocab_size, embedding_size, hidden_size, output_size):
super(LSTMTagger, self).__init__()
self.word_embeddings = nn.Embedding(vocab_size, embedding_size)
self.lstm = nn.LSTM(embedding_size, hidden_size)
self.output = nn.Linear(hidden_size, output_size)
def forward(self, X):
seq_embed = self.word_embeddings(X).permute(1, 0, 2)
out, hidden = self.lstm(seq_embed)
# Reshape the output to be a tensor of shape seq_len*label_size
output = self.output(out.view(X.data.size(1), -1))
return F.log_softmax(output)
In [73]:
word_vocab
Out[73]:
In [74]:
model = LSTMTagger(word_vocab.size, 5, 3, pos_vocab.size)
log_probs = model.forward(Variable(torch.LongTensor([X_train[0]])))
print("Log probs: ", log_probs)
In [75]:
loss_function = nn.NLLLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)
def get_loss_embedding_lstm_tagger(model, loss_function, seq, label, verbose=False):
## Clear gradients before every update else memory runs out
model.zero_grad()
## Do forward pass
log_probs = model.forward(Variable(torch.LongTensor([seq])))
if verbose:
print(log_probs)
## Get labels
target = Variable(torch.LongTensor(label))
if verbose:
print(target)
## Get loss
loss = loss_function(log_probs, target)
if verbose:
print(loss)
return loss
In [76]:
loss = get_loss_embedding_lstm_tagger(model, loss_function, X_train[0], Y_train[0], verbose=True)
In [77]:
losses = []
for epoch in tqdm(range(100)):
for seq, label in zip(X_train, Y_train):
loss = get_loss_embedding_lstm_tagger(model, loss_function, seq, label)
losses.append(loss.data[0])
## Get gradients of model params wrt. loss
loss.backward()
## Optimize the loss by one step
optimizer.step()
plt.plot(losses)
plt.xlabel("Step")
plt.ylabel("Loss")
Out[77]:
In [78]:
word_vocab = Vocab("words", UNK="UNK")
char_vocab = Vocab("chars", UNK="<U>")
pos_vocab = Vocab("pos_tags")
for sent, pos_tags in training_data:
for word, pos in zip(sent, pos_tags):
word_vocab.add(word)
pos_vocab.add(pos)
char_vocab.batch_add(word)
print(word_vocab, word_vocab.idx2item, word_vocab.item2idx)
print(char_vocab, char_vocab.idx2item, char_vocab.item2idx)
print(pos_vocab, pos_vocab.idx2item, pos_vocab.item2idx)
In [79]:
sent_encoder = Seq2Vec(word_vocab)
char_encoder = Seq2Vec(char_vocab)
pos_encoder = Seq2Vec(pos_vocab)
def dataseqchar2vec(data):
X, X_char, Y = [], [], []
for sent, tags in data:
X.append(sent_encoder.encode(sent))
X_char.append(char_encoder.batch_encode(sent))
Y.append(pos_encoder.encode(tags))
return X, X_char, Y
X_train, X_char_train, Y_train = dataseqchar2vec(training_data)
print(X_train, X_char_train, Y_train)
In [80]:
class CharEmbedding(nn.Module):
def __init__(self, vocab_size, embedding_size,
out_channels, kernel_sizes, dropout=0.5):
super(CharEmbedding, self).__init__()
self.char_embeddings = nn.Embedding(vocab_size, embedding_size)
self.convs1 = [nn.Conv2d(1, out_channels, (K, embedding_size))
for K in kernel_sizes]
self.dropout = nn.Dropout(dropout)
def forward(self, X):
x = self.char_embeddings(X)
# Ref: https://github.com/Shawn1993/cnn-text-classification-pytorch/blob/master/model.py
x = x.unsqueeze(1) # (N,Ci,W,D)
x = [F.relu(conv(x)).squeeze(3) for conv in self.convs1] #[(N,Co,W), ...]*len(Ks)
x = [F.max_pool1d(self.dropout(i), i.size(2)).squeeze(2) for i in x] #[(N,Co), ...]*len(Ks)
x = torch.cat(x, 1)
return self.dropout(x)
In [81]:
X_char_train[0][2]
Out[81]:
In [82]:
char_embedding = CharEmbedding(char_vocab.size, 5, 4, [2, 3])
char_embedding(Variable(torch.LongTensor([X_char_train[0][2]])))
Out[82]:
In [83]:
torch.cat([
char_embedding(Variable(torch.LongTensor([x]))).unsqueeze(0)
for x in X_char_train[0]
], 1)
Out[83]:
In [84]:
class WordCharEmbedding(nn.Module):
def __init__(self, vocab_size, embedding_size,
char_embedding_model, dropout=0.5):
super(WordCharEmbedding, self).__init__()
self.char_embeddings = char_embedding_model
self.word_embeddings = nn.Embedding(vocab_size, embedding_size)
self.dropout = nn.Dropout(dropout)
def forward(self, X, X_char=None):
# Ref: https://github.com/Shawn1993/cnn-text-classification-pytorch/blob/master/model.py
word_vecs = self.word_embeddings(X)
if X_char is not None:
char_vecs = torch.cat([
self.char_embeddings(x).unsqueeze(0)
for x in X_char
], 1)
word_vecs = char_vecs + word_vecs
return self.dropout(word_vecs)
In [85]:
Variable(torch.Tensor(X_train[0]))
Out[85]:
In [86]:
def charseq2varlist(X_chars):
return [Variable(torch.LongTensor([x])) for x in X_chars]
In [87]:
word_char_embedding = WordCharEmbedding(word_vocab.size, 8, char_embedding, dropout=0)
word_char_embedding(Variable(torch.LongTensor([X_train[0]])), charseq2varlist(X_char_train[0]))
Out[87]:
In [88]:
class LSTMTaggerWordChar(nn.Module):
def __init__(self, word_char_embedding, embedding_size, hidden_size, output_size):
super(LSTMTaggerWordChar, self).__init__()
self.word_embeddings = word_char_embedding
self.lstm = nn.LSTM(embedding_size, hidden_size//2, bidirectional=True)
self.output = nn.Linear(hidden_size, output_size)
def forward(self, X, X_char):
seq_embed = self.word_embeddings(X, X_char).permute(1, 0, 2)
out, hidden = self.lstm(seq_embed)
# Reshape the output to be a tensor of shape seq_len*label_size
output = self.output(out.view(X.data.size(1), -1))
return F.log_softmax(output)
In [89]:
model = LSTMTaggerWordChar(word_char_embedding, 8, 4, pos_vocab.size)
log_probs = model.forward(Variable(torch.LongTensor([X_train[0]])), charseq2varlist(X_char_train[0]))
print("Log probs: ", log_probs)
In [90]:
loss_function = nn.NLLLoss()
optimizer = optim.SGD(model.parameters(), lr=0.1)
def get_loss_embedding_lstm_tagger_word_char(model, loss_function, seq, seq_char, label, verbose=False):
## Clear gradients before every update else memory runs out
model.zero_grad()
## Do forward pass
log_probs = model.forward(Variable(torch.LongTensor([seq])),
charseq2varlist(seq_char))
if verbose:
print(log_probs)
## Get labels
target = Variable(torch.LongTensor(label))
if verbose:
print(target)
## Get loss
loss = loss_function(log_probs, target)
if verbose:
print(loss)
return loss
In [91]:
loss = get_loss_embedding_lstm_tagger_word_char(model, loss_function, X_train[0], X_char_train[0], Y_train[0], verbose=True)
In [92]:
losses = []
for epoch in tqdm(range(100)):
for seq, seq_char, label in zip(X_train, X_char_train, Y_train):
loss = get_loss_embedding_lstm_tagger_word_char(model, loss_function, seq, seq_char, label)
losses.append(loss.data[0])
## Get gradients of model params wrt. loss
loss.backward()
## Optimize the loss by one step
optimizer.step()
plt.plot(losses)
plt.xlabel("Step")
plt.ylabel("Loss")
Out[92]:
In [93]:
def to_scalar(var):
# returns a python float
return var.view(-1).data.tolist()[0]
def argmax(vec):
# return the argmax as a python int
_, idx = torch.max(vec, 1)
return to_scalar(idx)
def log_sum_exp_torch(vecs, axis=None):
## Use help from: http://pytorch.org/tutorials/beginner/nlp/advanced_tutorial.html#sphx-glr-beginner-nlp-advanced-tutorial-py
if axis < 0:
axis = vecs.ndimension()+axis
max_val, _ = vecs.max(axis)
vecs = vecs - max_val.expand_as(vecs)
out_val = torch.log(torch.exp(vecs).sum(axis))
#print(max_val, out_val)
return max_val + out_val
log_sum_exp_torch(Variable(torch.Tensor([[1,2,3,4]])), axis=-1)
Out[93]:
In [94]:
class CRFLayer(nn.Module):
def __init__(self, num_labels):
super(CRFLayer, self).__init__()
self.num_labels = num_labels
self.transitions = nn.Parameter(torch.randn(self.num_labels, self.num_labels))
def _forward_alg(self, emissions):
scores = emissions[0]
# Get the log sum exp score
transitions = self.transitions.transpose(-1,-2)
for i in range(1, emissions.size(0)):
scores = emissions[i] + log_sum_exp_torch(
scores.expand_as(transitions) + transitions,
axis=1)
return log_sum_exp_torch(scores, axis=-1)
def _score_sentence(self, emissions, tags):
score = emissions[0][tags[0]]
for i, emission in enumerate(emissions[1:]):
score = score + self.transitions[tags[i], tags[i+1]] + emission[tags[i+1]]
return score
def _viterbi_decode(self, emissions):
scores = torch.zeros(emissions.size(1))
back_pointers = torch.zeros(emissions.size()).int()
scores = scores + emissions[0]
transitions = self.transitions
# Generate most likely scores and paths for each step in sequence
for i in range(1, emissions.size(0)):
scores_with_transitions = scores.unsqueeze(1).expand_as(transitions) + transitions
max_scores, back_pointers[i] = torch.max(scores_with_transitions, 0)
scores = emissions[i] + max_scores
# Generate the most likely path
viterbi = [scores.numpy().argmax()]
back_pointers = back_pointers.numpy()
for bp in reversed(back_pointers[1:]):
viterbi.append(bp[viterbi[-1]])
viterbi.reverse()
viterbi_score = scores.numpy().max()
return viterbi_score, viterbi
def neg_log_likelihood(self, feats, tags):
forward_score = self._forward_alg(feats)
gold_score = self._score_sentence(feats, tags)
return forward_score - gold_score
def forward(self, feats):
# Find the best path, given the features.
score, tag_seq = self._viterbi_decode(feats)
return score, tag_seq
In [95]:
class LSTMTaggerWordCharCRF(nn.Module):
def __init__(self, word_char_embedding, embedding_size, hidden_size, output_size):
super(LSTMTaggerWordCharCRF, self).__init__()
self.word_embeddings = word_char_embedding
self.lstm = nn.LSTM(embedding_size, hidden_size//2, bidirectional=True)
self.output = nn.Linear(hidden_size, output_size)
self.crf = CRFLayer(output_size)
def forward(self, X, X_char):
seq_embed = self.word_embeddings(X, X_char).permute(1, 0, 2)
out, hidden = self.lstm(seq_embed)
# Reshape the output to be a tensor of shape seq_len*label_size
output = self.output(out.view(X.data.size(1), -1))
return output
def loss(self, X, X_char, Y):
feats = self.forward(X, X_char)
return self.crf.neg_log_likelihood(feats, Y)
In [96]:
model = LSTMTaggerWordCharCRF(word_char_embedding, 8, 4, pos_vocab.size)
features = model.forward(Variable(torch.LongTensor([X_train[0]])), charseq2varlist(X_char_train[0]))
print("Features: ", features)
In [97]:
optimizer = optim.SGD([
{"params": model.parameters()},
], lr=0.1)
def get_loss_embedding_lstm_tagger_word_char_crf(model, seq, seq_char, label, verbose=False):
## Clear gradients before every update else memory runs out
model.zero_grad()
## Do forward pass
X, X_char = Variable(torch.LongTensor([seq])), charseq2varlist(seq_char)
## Get labels
target = torch.LongTensor(label)
if verbose:
print(target)
## Get loss
loss = model.loss(X, X_char, target)
if verbose:
print(loss)
return loss
In [98]:
loss = get_loss_embedding_lstm_tagger_word_char_crf(model, X_train[0], X_char_train[0], Y_train[0], verbose=True)
In [99]:
losses = []
for epoch in tqdm(range(100)):
for seq, seq_char, label in zip(X_train, X_char_train, Y_train):
loss = get_loss_embedding_lstm_tagger_word_char_crf(model, seq, seq_char, label)
losses.append(loss.data[0])
## Get gradients of model params wrt. loss
loss.backward()
## Optimize the loss by one step
optimizer.step()
plt.plot(losses)
plt.xlabel("Step")
plt.ylabel("Loss")
Out[99]:
In [ ]: